﻿using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MessageLib.Helper;
using MessageLib.Model;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace MessageLib.RabbitMq
{
    /// <summary>
    /// 缓存
    /// </summary>
    public class Cache
    {
        public string TopicName { get; set; }
        public bool State { get; set; }
    }

    public class RabbitMqClient
    {


        private readonly ConnectionFactory _factory = new ConnectionFactory
        {
            // HostName = "127.0.0.1",
            HostName = "192.18.2.1",
            Port = 5672,
            // HostName = "10.0.100.132",
            //  HostName = "192.168.2.111",
            UserName = "admin", //用户名
            Password = "123456", //密码



            AutomaticRecoveryEnabled = true
        };
        public event Action<ConnectionState> StateUpdateEvent;
        public event Action<string, byte[]> PublishMessageEvent;

        private ConnectionState _state;
        protected virtual void OnPublishMessageEvent(string topicName, byte[] payload)
        {
            PublishMessageEvent?.Invoke(topicName, payload);
        }

        private IConnection _conn;
        private readonly LogInfo _log;

        /// <summary>
        /// 缓存
        /// </summary>
        private readonly Dictionary<string, Cache> _cache = new Dictionary<string, Cache>();
        /// <summary>
        /// 网络状态
        /// </summary>
        private bool _networkState = false;
        /// <summary>
        /// 心跳时间
        /// </summary>
        private DateTime _heartbeatTime;

        public RabbitMqClient(string ip, int port, LogInfo log)
        {
            _factory.HostName = ip;
            _factory.Port = port;

            //  _factory.RequestedHeartbeat = 120;   //心跳30S
            _factory.AutomaticRecoveryEnabled = true;  //重新连接
            _factory.TopologyRecoveryEnabled = true;
            _factory.NetworkRecoveryInterval = new TimeSpan(0, 2, 0); //两分钟重试连接
            _log = log;
            _heartbeatTime = DateTime.Now;
            try
            {
                Task.Run(() =>
                {
                    while (true)
                    {
                        try
                        {
                            //判断 连接是否正常
                            if (!_networkState || _heartbeatTime.AddMinutes(3) < DateTime.Now)
                            {
                                _networkState = false;
                                foreach (var item in _cache)
                                {
                                    item.Value.State = false;
                                }
                                _conn = _factory.CreateConnection();
                                _networkState = true;
                            }

                            if (_networkState)
                            {
                                foreach (var item in _cache)
                                {
                                    if (!item.Value.State)
                                    {
                                        Subscribe(item.Value.TopicName, item.Value);
                                    }
                                }
                            }
                        }
                        catch (Exception ex)
                        {
                            _log.Error(ex);
                        }
                        Thread.Sleep(1000 * 30);
                    }

                });


                _conn = _factory.CreateConnection();
                _networkState = true;
            }
            catch (Exception ex)
            {
                _log.Error(ex);
            }

        }
        public bool Connect()
        {
            try
            {
                _conn = _factory.CreateConnection();
                _state = ConnectionState.Connected;
                OnStateUpdateEvent(_state);
                _networkState = true;
            }
            catch (Exception ex)
            {
                _log.Error(ex);
                return false;
            }
            return true;
        }
        public bool Push<T>(string topicName, T message, ISerialize serialize, int overdueTime) where T : Message, new()
        {
            try
            {
                using (IModel channel = _conn.CreateModel())
                {
                    IDictionary<String, Object> argss = new Dictionary<String, Object>();
                    argss.Add("vhost", "/");
                    argss.Add("username", "admin");
                    argss.Add("password", "123456");

                    argss.Add("x-message-ttl", overdueTime);
                    //在MQ上定义一个持久化队列，如果名称相同不会重复创建
                    channel.QueueDeclare(topicName, true, false, false, argss);
                    // channel.QueueDeclare(topicName, true, false, false, null);

                    // string message = string.Format("Message_{0}{1}", DateTime.Now, Console.ReadLine());
                    byte[] buffer = Encoding.UTF8.GetBytes(message.ToJson());
                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;
                    channel.BasicPublish("", topicName, properties, buffer);
                    // LogEvent("生产", "消息发送成功：" + message); 
                    if (_state != ConnectionState.Connected)
                    {
                        _state = ConnectionState.Connected;
                        OnStateUpdateEvent(_state);
                    }

                }
            }
            catch (Exception ex)
            {
                _state = ConnectionState.Disconnected;
                OnStateUpdateEvent(_state);
                _log.Error(ex);
                return false;
            }


            return true;
        }


        public bool Push<T>(string topicName, T message, ISerialize serialize) where T : Message, new()
        {
            try
            {
                using (IModel channel = _conn.CreateModel())
                {

                    //在MQ上定义一个持久化队列，如果名称相同不会重复创建
                    channel.QueueDeclare(topicName, true, false, false, null);

                    // string message = string.Format("Message_{0}{1}", DateTime.Now, Console.ReadLine());
                    byte[] buffer = Encoding.UTF8.GetBytes(message.ToJson());
                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;
                    channel.BasicPublish("", topicName, properties, buffer);
                    // LogEvent("生产", "消息发送成功：" + message); 
                    if (_state != ConnectionState.Connected)
                    {
                        _state = ConnectionState.Connected;
                        OnStateUpdateEvent(_state);
                    }

                }
            }
            catch (Exception ex)
            {
                _state = ConnectionState.Disconnected;
                OnStateUpdateEvent(_state);
                _log.Error(ex);
                return false;
            }


            return true;
        }

        private bool Subscribe(string topicName, int overdueTime, Cache cache)
        {
            try
            {
                Task.Run(() =>
                { 
                    if (_networkState)
                    {
                        try
                        {
                            using (IModel channel = _conn.CreateModel())
                            {

                                IDictionary<String, Object> argss = new Dictionary<String, Object>();
                                argss.Add("vhost", "/");
                                argss.Add("username", "admin");
                                argss.Add("password", "123456");
                                argss.Add("x-message-ttl", overdueTime);
                                //在MQ上定义一个持久化队列，如果名称相同不会重复创建
                                channel.QueueDeclare(topicName, true, false, false, argss);

                                // channel.QueueDeclare(topicName, true, false, false, null); //在MQ上定义一个持久化队列，如果名称相同不会重复创建 申请一个队列 

                                channel.BasicQos(0, 1, false); //输入1，那如果接收一个消息，但是没有应答，则客户端不会收到下一个消息

                                //    LogEvent("消费者", "Listening...");


                                QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); //在队列上定义一个消费者 
                                channel.BasicConsume(topicName, false, consumer); //消费队列，并设置应答模式为程序主动应答 
                                cache.State = true;
                                while (true)
                                {
                                    BasicDeliverEventArgs ea = consumer.Queue.Dequeue(); //阻塞函数，获取队列中的消息
                                    byte[] bytes = ea.Body;
                                    OnPublishMessageEvent(topicName, bytes);
                                    //  string str = Encoding.UTF8.GetString(bytes); 
                                    //                            LogEvent("消费者", "队列消息:" + str.ToString());
                                    //回复确认
                                    channel.BasicAck(ea.DeliveryTag, false);
                                    _heartbeatTime = DateTime.Now;
                                }
                            }
                        }
                        catch (Exception ex)
                        { 
                            _networkState = false;
                            _log.Error(ex);

                        }
                    }
                    // ReSharper disable once FunctionNeverReturns
                });
            }
            catch (Exception ex)
            {
                _log.Error(ex);
                return false;
            }
            return true;
        }

        public bool Subscribe(string topicName, int overdueTime)
        {
            Cache temp;
            if (!_cache.ContainsKey(topicName))
            {
                temp = new Cache() { TopicName = topicName, State = false };
                _cache.Add(topicName, temp);
            }
            else
            {
                temp = _cache[topicName];//
            }
            if (!temp.State)//重复订阅
            {
                return Subscribe(topicName, overdueTime, temp);
            }
            return true;
        }

        private bool Subscribe(string topicName, Cache cache)
        {
            try
            {
                Task.Run(() =>
                {
                    if (_networkState)
                    {
                        try
                        {
                            using (IModel channel = _conn.CreateModel())
                            {

                                channel.QueueDeclare(topicName, true, false, false, null);
                                //在MQ上定义一个持久化队列，如果名称相同不会重复创建 申请一个队列 

                                channel.BasicQos(0, 1, false); //输入1，那如果接收一个消息，但是没有应答，则客户端不会收到下一个消息

                                //    LogEvent("消费者", "Listening...");


                                QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); //在队列上定义一个消费者 
                                channel.BasicConsume(topicName, false, consumer); //消费队列，并设置应答模式为程序主动应答 
                                cache.State = true;
                                while (true)
                                {
                                    BasicDeliverEventArgs ea = consumer.Queue.Dequeue(); //阻塞函数，获取队列中的消息
                                    byte[] bytes = ea.Body;
                                    OnPublishMessageEvent(topicName, bytes);
                                    //  string str = Encoding.UTF8.GetString(bytes); 
                                    //                            LogEvent("消费者", "队列消息:" + str.ToString());
                                    //回复确认
                                    channel.BasicAck(ea.DeliveryTag, false);
                                    _heartbeatTime = DateTime.Now;
                                }
                            }
                        }
                        catch (Exception ex)
                        {
                            _networkState = false;
                            _log.Error(ex);

                        }
                    }
                    // ReSharper disable once FunctionNeverReturns
                });
            }
            catch (Exception ex)
            {
                _log.Error(ex);
                return false;
            }
            return true;
        }



        public bool Subscribe(string topicName)
        {
            Cache temp;
            if (!_cache.ContainsKey(topicName))
            {
                temp = new Cache() { TopicName = topicName, State = false };
                _cache.Add(topicName, temp);
            }
            else
            {
                temp = _cache[topicName];//
            }
            if (!temp.State)//重复订阅
            {
                return Subscribe(topicName, temp);
            }
            return true;


        }


        protected virtual void OnStateUpdateEvent(ConnectionState obj)
        {
            StateUpdateEvent?.Invoke(obj);
        }
    }

}
